一场雨

motan 服务注册和暴露

初始化

@Override
public void afterPropertiesSet() throws Exception {

    // 注意:basicConfig需要首先配置,因为其他可能会依赖于basicConfig的配置
    checkAndConfigBasicConfig();
    checkAndConfigExport();
    checkAndConfigRegistry();

}

配置部分

//checkAndConfigBasicConfig();
<motan:basicService export="demoMotan:8002"
                    group="motan-demo-rpc" accessLog="false" shareChannel="true" module="motan-demo-rpc"
                    application="myMotanDemo" registry="registry" id="serviceBasicConfig"/>


//checkAndConfigExport();
<!-- 协议配置。为防止多个业务配置冲突,推荐使用id表示具体协议。-->
<motan:protocol id="demoMotan" default="true" name="motan"
                maxServerConnection="80000" maxContentLength="1048576"
                maxWorkerThread="800" minWorkerThread="20"/>


//checkAndConfigRegistry();
<motan:registry regProtocol="zookeeper" name="registry" address="127.0.0.1:2181" connectTimeout="2000"/>

执行export()

@Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
       if (!getExported().get()) {//如果没有暴露过

           export();

       }
   }

doExport();

doExport(protocolConfig, port, registryUrls);

exporters.add(configHandler.export(interfaceClass, ref, urls));//关键

//interfaceClass : interface com.weibo.motan.demo.service.MotanDemoService

//ref : com.weibo.motan.demo.server.MotanDemoServiceImpl@6986852

//urls 是路由数据 (关键参数)

Alt text

Alt text

SimpleConfigHandler 中:

@Override
public <T> Exporter<T> export(Class<T> interfaceClass, T ref, List<URL> registryUrls) {

String serviceStr = StringTools.urlDecode(registryUrls.get(0).getParameter(URLParamType.embed.getName()));
        //motan://10.226.110.67:8001/com.weibo.motan.demo.service.MotanDemoService?maxContentLength=1048576&module=motan-demo-rpc&
        //nodeType=service&accessLog=false&minWorkerThread=20&protocol=motan&isDefault=true&application=myMotanDemo&
        //maxWorkerThread=800&shareChannel=true&refreshTimestamp=1487644852350
        //&id=com.weibo.api.motan.config.springsupport.ServiceConfigBean&export=demoMotan:8001&maxServerConnection=80000&group=motan-demo-rpc&

URL serviceUrl = URL.valueOf(serviceStr);

String protocolName = serviceUrl.getParameter(URLParamType.protocol.getName(), URLParamType.protocol.getValue());//motan

Protocol protocol = new ProtocolFilterDecorator(ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(protocolName));

Provider<T> provider = new DefaultProvider<T>(ref, serviceUrl, interfaceClass);

Exporter<T> exporter = protocol.export(provider, serviceUrl);//协议负责打包

register(registryUrls, serviceUrl);//这边就是将服务注册到 注册器中 :这边用的是zookeeper (注意zk的实现)

return exporter;

}

Alt text

Alt text

Alt text

Alt text

@SuppressWarnings("unchecked")
@Override
public <T> Exporter<T> export(Provider<T> provider, URL url) {
    if (url == null) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: url is null",
                MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    }

    if (provider == null) {
        throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: provider is null, url=" + url,
                MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
    }

    String protocolKey = MotanFrameworkUtil.getProtocolKey(url);
    //

    synchronized (exporterMap) {
        Exporter<T> exporter = (Exporter<T>) exporterMap.get(protocolKey);

        if (exporter != null) {//先判断存在性: protocolKey 和  exporter 一一对应
            throw new MotanFrameworkException(this.getClass().getSimpleName() + " export Error: service already exist, url=" + url,
                    MotanErrorMsgConstant.FRAMEWORK_INIT_ERROR);
        }

        exporter = createExporter(provider, url);// 通过provider 和 url 创建 exporter 并且创建一个通讯服务端(调用netty的实现)

        exporter.init();//初始化 这边包括初始化 netty的服务 open();

        exporterMap.put(protocolKey, exporter);

        LoggerUtil.info(this.getClass().getSimpleName() + " export Success: url=" + url);

        return exporter;
    }


}

DefaultRpcProtocol 的 DefaultRpcExporter的调用:

public DefaultRpcExporter(Provider<T> provider, URL url) {

    //构造方法 返回出去的是 provider

    super(provider, url);

    ProviderMessageRouter requestRouter = initRequestRouter(url);

    endpointFactory = ExtensionLoader.getExtensionLoader(EndpointFactory.class).
                      getExtension(url.getParameter(URLParamType.endpointFactory.getName(),
                                    URLParamType.endpointFactory.getValue()));

    server = endpointFactory.createServer(url, requestRouter);

}

Alt text

AbstractEndpointFactory中:

@Override
public Server createServer(URL url, MessageHandler messageHandler) {
    HeartbeatFactory heartbeatFactory = getHeartbeatFactory(url);
    messageHandler = heartbeatFactory.wrapMessageHandler(messageHandler);

    synchronized (ipPort2ServerShareChannel) {
        String ipPort = url.getServerPortStr();
        String protocolKey = MotanFrameworkUtil.getProtocolKey(url);

        boolean shareChannel =
                url.getBooleanParameter(URLParamType.shareChannel.getName(), URLParamType.shareChannel.getBooleanValue());

        if (!shareChannel) { // 独享一个端口
            LoggerUtil.info(this.getClass().getSimpleName() + " create no_share_channel server: url={}", url);

            // 如果端口已经被使用了,使用该server bind 会有异常
            return innerCreateServer(url, messageHandler);
        }

        LoggerUtil.info(this.getClass().getSimpleName() + " create share_channel server: url={}", url);

        Server server = ipPort2ServerShareChannel.get(ipPort);

        if (server != null) {
            // can't share service channel
            if (!MotanFrameworkUtil.checkIfCanShallServiceChannel(server.getUrl(), url)) {
                throw new MotanFrameworkException(
                        "Service export Error: share channel but some config param is different, protocol or codec or serialize or maxContentLength or maxServerConnection or maxWorkerThread or heartbeatFactory, source="
                                + server.getUrl() + " target=" + url, MotanErrorMsgConstant.FRAMEWORK_EXPORT_ERROR);
            }

            saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

            return server;
        }

        url = url.createCopy();
        url.setPath(""); //共享server端口,由于有多个interfaces存在,所以把path设置为空
        server = innerCreateServer(url, messageHandler);//创建的是一个 netty的服务端

        ipPort2ServerShareChannel.put(ipPort, server);
        saveEndpoint2Urls(server2UrlsShareChannel, server, protocolKey);

        return server;
    }
}

afterExport();

在集合中添加服务;

以上是一个大致的流程:建立 netty 的传输服务 ,将 暴露的服务 注册到 注册器中。